| Data Engineering Task | Python API Example | SQL Example |
|---|---|---|
| Load Data | df = spark.read.csv("data.csv", header=True) |
CREATE OR REPLACE TEMP VIEW data AS SELECT * FROM csv. OPTIONS ('header' 'true') |
| Data Cataloging | spark.catalog.listTables("my_database") |
SHOW TABLES IN my_database |
| Data Lineage Tracking | spark.sql("DESCRIBE HISTORY my_table") |
DESCRIBE HISTORY my_table |
| Streaming Data Processing | df = spark.readStream.format("kafka").option("subscribe", "topic1").load() |
CREATE STREAMING TABLE data_stream AS SELECT * FROM kafka.\topic1`` |
To run SQL
df.createOrReplaceTempView("data_view")
result = spark.sql("SELECT col1 FROM data_view")
| Use Case | Python API | SQL |
|---|---|---|
| Load Data | df = spark.read.csv("data.csv", header=True) |
CREATE OR REPLACE TEMP VIEW data AS SELECT * FROM csv.\data.csv` OPTIONS ('header' 'true')` |
| Select Columns | df.select("col1", "col2") |
SELECT col1, col2 FROM data |
| Filter Rows | df.filter(df["col1"] > 10) |
SELECT * FROM data WHERE col1 > 10 |
| Add Column | df.withColumn("new_col", df["col1"] * 2) |
SELECT *, col1 * 2 AS new_col FROM data |
| Group & Aggregate | df.groupBy("col1").agg({"col2": "sum"}) |
SELECT col1, SUM(col2) AS total FROM data GROUP BY col1 |
| Join Tables | df1.join(df2, df1["key"] == df2["key"], "inner") |
SELECT * FROM df1 INNER JOIN df2 ON df1.key = df2.key |
| Sort Data | df.orderBy(df["col1"].desc()) |
SELECT * FROM data ORDER BY col1 DESC |
| Write Data | df.write.format("parquet").save("output.parquet") |
CREATE TABLE parquet.\output.parquet` AS SELECT * FROM data` |
Approaches can be mixed via df.selectExpr()
df.selectExpr("col1", "col2 AS renamed_col").show()
raw_stream = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", topic) \
.option("startingOffsets", "earliest") \
.load()
bronze_writer_query = raw_stream.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", checkpointLocation ) \
.option("path", bronze_path) \
.trigger(availableNow=True) \
.start()